fix(low-code): Fix legacy state migration in SubstreamPartitionRouter#261
Conversation
📝 WalkthroughWalkthroughThe changes focus on improving the Changes
Sequence DiagramsequenceDiagram
participant SR as SubstreamPartitionRouter
participant IS as Initial State
SR->>IS: set_initial_state()
IS-->>SR: Validate state type
alt Valid State
SR->SR: Copy state to parent streams
else Invalid State
SR->SR: Ignore state
end
Looks like you've added some nice defensive programming to the state handling! A couple of quick thoughts:
Would you be interested in exploring that logging addition? It could help with debugging unexpected state scenarios. Cheers! 🚀 ✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)
299-304: LGTM! The state handling improvements look solid.The changes make the state migration more robust by filtering out non-scalar values. The variable rename from
substream_statetosubstream_state_valuesbetter reflects its purpose.What do you think about adding a debug log when filtering out a state value? This could help with troubleshooting, wdyt? 🤔
substream_state_values = list(stream_state.values()) substream_state = substream_state_values[0] if substream_state_values else {} # Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state} if isinstance(substream_state, (list, dict)): + self.logger.debug(f"Filtering out non-scalar state value: {substream_state}") substream_state = {}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py(1 hunks)unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
405-512: Great test coverage! 🎯The test cases thoroughly validate the state migration logic with well-structured parameterized tests. The test IDs and docstring provide clear documentation of what's being tested.
|
I found this error in CATs for Jira: The parent state was missing, causing the substream partition router to attempt migrating the substream state for the parent stream. However, it should only be migrated if it is in the legacy format. We didn’t encounter errors related to this issue previously because the parent state was updated during the sync. In contrast, with the concurrent per-partition cursor, the parent state is updated at the end of the sync, which created this corner case where the parent state is missing. With the fix, tests are successful. |
Aldo Gonzalez (aldogonzalez8)
left a comment
There was a problem hiding this comment.
APPROVED
Summary by CodeRabbit
Bug Fixes
Tests